package cn.doitedu.datayi.etl

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.roaringbitmap.RoaringBitmap

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, FileOutputStream, ObjectOutputStream}
import scala.collection.mutable

object BitMapCubeConstruct {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .appName("bitmap层级上卷示例")
      .master("local")
      .config("spark.sql.shuffle.partitions","2")
      .getOrCreate()

    val df: DataFrame = spark.read
      .option("header", "true")
      .csv("data/bitmaptest")
      .selectExpr("province","city","region","cast(userid as int) as userid","event")
    df.createTempView("df")

    // 计算  省、市、区  下的   访客数 和 事件数
    // 传统做法
    val pcr_agr_traditional = spark.sql(
      """
        |
        |select
        |  province,
        |  city,
        |  region,
        |  count(distinct userid) as uv_cnt,
        |  count(1) as ev_cnt
        |
        |from df
        |group by province,city,region
        |
        |""".stripMargin)

    // pcr_agr_traditional.show(100,false)
    /**
     * +--------+------+------+------+------+
       |province|city  |region|uv_cnt|ev_cnt|
       +--------+------+------+------+------+
       |湖北省  |襄阳市|郭靖区|3     |4     |
       |湖北省  |武汉市|汉阳区|3     |4     |
       |湖北省  |武汉市|东湖区|2     |4     |
       |湖北省  |襄阳市|黄蓉区|2     |3     |
       +--------+------+------+------+------+
     */

    // 计算  省、市、区  下的   访客数 和 事件数
    // 通往bitmap的做法
    val pcr_agr_userarray = spark.sql(
      """
        |
        |select
        |  province,
        |  city,
        |  region,
        |  collect_set(userid) as users,
        |  count(1) as ev_cnt
        |
        |from df
        |group by province,city,region
        |
        |""".stripMargin)
    //pcr_agr_userarray.show(100,false)

    /**
     * +--------+------+---------+---------+------+
       |province|city  |region   |users    |ev_cnt|
       +--------+------+---------+---------+------+
       |湖北省  |武汉市  |汉阳区    |[3, 1, 2]|4     |
       |湖北省  |武汉市  |东湖区    |[3, 1]   |4     |
       |湖北省  |襄阳市  |郭靖区    |[3, 1, 4]|4     |
       |湖北省  |襄阳市  |黄蓉区    |[1, 4]   |3     |
       +--------+------+--------+---------+------+
     */

    // 计算  省、市、区  下的   访客数 和 事件数
    // bitmap的做法

    // 函数1： 根据一个传入的整数数组，返回一个bitmap的序列化字节
    val arr2Bitmap = (arr:mutable.WrappedArray[Int])=>{
      val bm: RoaringBitmap = RoaringBitmap.bitmapOf(arr.toArray: _*)

      // 将bitmap对象序列化到一个字节数组流中
      val byteStream = new ByteArrayOutputStream()
      val dataSteam = new DataOutputStream(byteStream)
      bm.serialize(dataSteam)

      // 将得到数据的 字节数组流 转成  字节数组
      byteStream.toByteArray
    }

    // 函数2: 根据传入的bitmap的序列化字节，返回这个bitmap的cardinality（基数）
    val bm_card = (bytes:Array[Byte])=>{
      val bm: RoaringBitmap = RoaringBitmap.bitmapOf()

      // 先将序列化字节数组，反序列化成bitmap对象
      val byteStream = new ByteArrayInputStream(bytes.toArray)
      val dataStream = new DataInputStream(byteStream)
      bm.deserialize(dataStream)

      // 调用方法获得基数返回
      bm.getCardinality
    }


    spark.udf.register("arr2bitmap",arr2Bitmap)
    spark.udf.register("bm_card",bm_card)

    val pcr_agr_bitmap = spark.sql(
      """
        |
        |select
        |  province,
        |  city,
        |  region,
        |  arr2bitmap(collect_set(userid)) as uv_bitmap,
        |  bm_card(arr2bitmap(collect_set(userid)) ) as uv_cnt,
        |  count(1) as ev_cnt
        |
        |from df
        |group by province,city,region
        |
        |""".stripMargin)
    // pcr_agr_bitmap.printSchema()
    // pcr_agr_bitmap.show(100,false)

    /**
       root
          |-- province: string (nullable = true)
          |-- city: string (nullable = true)
          |-- region: string (nullable = true)
          |-- uv_bitmap: binary (nullable = true)
          |-- uv_cnt: integer (nullable = false)
          |-- ev_cnt: long (nullable = false)
     */
    /**
    +--------+------+------+-------------------------------------------------------------------+------+------+
     |province|city  |region|uv_bitmap                                                          |uv_cnt|ev_cnt|
     +--------+------+------+-------------------------------------------------------------------+------+------+
     |湖北省  |武汉市|汉阳区|[3A 30 00 00 01 00 00 00 00 00 02 00 10 00 00 00 01 00 02 00 03 00]   |3     |4     |
     |湖北省  |武汉市|东湖区|[3A 30 00 00 01 00 00 00 00 00 01 00 10 00 00 00 01 00 03 00]         |2     |4     |
     |湖北省  |襄阳市|郭靖区|[3A 30 00 00 01 00 00 00 00 00 02 00 10 00 00 00 01 00 03 00 04 00]   |3     |4     |
     |湖北省  |襄阳市|黄蓉区|[3A 30 00 00 01 00 00 00 00 00 01 00 10 00 00 00 01 00 04 00]         |2     |3     |
     +--------+------+------+-------------------------------------------------------------------+------+------+
     */

    /**
     * 逐级聚合 （ 聚合省、市）
     *   ： 从更细粒度的聚合报表中，直接得出更粗粒度的报表
     */
    pcr_agr_bitmap.createTempView("pcr_agr")
    spark.udf.register("bm_or",BitmapOrUDAF)
    val pc_agr = spark.sql(
      """
        |
        |select
        |   province,
        |   city,
        |   bm_or(uv_bitmap) as uv_bitmap,
        |   bm_card(bm_or(uv_bitmap)) as uv_cnt,
        |   sum(ev_cnt) as ev_cnt
        |from pcr_agr
        |group by province,city
        |
        |""".stripMargin)
    pc_agr.createTempView("pc_agr")
    pc_agr.printSchema()
    pc_agr.show(100,false)


    /**
     * 逐级聚合1 （ 聚合省）
     *   ： 从更细粒度（省市区）的聚合报表中，直接得出更粗粒度的报表
     */
    val p_agr = spark.sql(
      """
        |
        |select
        |   province,
        |   bm_or(uv_bitmap) as uv_bitmap,
        |   bm_card(bm_or(uv_bitmap)) as uv_cnt,
        |   sum(ev_cnt) as ev_cnt
        |from pcr_agr
        |group by province
        |
        |""".stripMargin)
    p_agr.printSchema()
    p_agr.show(100,false)


    /**
     * 逐级聚合2 （ 聚合省）
     *   ： 从更细粒度（省市）的聚合报表中，直接得出更粗粒度的报表
     */
    val p_agr2 = spark.sql(
      """
        |
        |select
        |   province,
        |   bm_or(uv_bitmap) as uv_bitmap,
        |   bm_card(bm_or(uv_bitmap)) as uv_cnt,
        |   sum(ev_cnt) as ev_cnt
        |from pc_agr
        |group by province
        |
        |""".stripMargin)
    p_agr2.printSchema()
    p_agr2.show(100,false)


    spark.close()
  }

}
